Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conduit: pipeline run loop implementation #1183

Merged
merged 6 commits into from
Aug 17, 2022

Conversation

Eric-Warehime
Copy link
Contributor

@Eric-Warehime Eric-Warehime commented Aug 15, 2022

Summary

I've gone through the initial implementation of the conduit pipeline run loop, and found/fixed a few bugs along the way.

Bug fixes/changes:

  • Configs were being loaded, but empty string were being passed into plugins during init--now we parse and load plugins with proper configs
  • The Processor interface was the only plugin interface not taking a logger, so I've added one
  • Additionally, I've made sure that all conduit loggers are properly being passed to Inits
  • Fixed import order in as many places as I saw
  • Rearranged ledger variable assignment in block_processor to avoid a null pointer exception
  • algod importer was not properly decoding config data--switched to using the protocol package to fix this issue
  • Fixed an issue where importing the genesis file in the ledger initializes the 0 block, but the same is not true for pgsql
    • We now no-op the 0 round in the ledger instead of failing on it.
  • Added a run loop for the pipeline--infinitely runs, exiting on any error

I know of a few more bugs that I wasn't able to address here that I will create tickets for.

Test Plan

This PR runs using local algod importer, (noop/block_processor) processors, and (noop/postgresql) exporters.

@Eric-Warehime Eric-Warehime added Not-Yet-Enabled Feature is not yet enabled at this time Skip-Release-Notes Reserved for PRs which do not need to be included in Release Notes labels Aug 15, 2022
@codecov
Copy link

codecov bot commented Aug 16, 2022

Codecov Report

❗ No coverage uploaded for pull request base (conduit@a0b6ecc). Click here to learn what that means.
The diff coverage is n/a.

@@            Coverage Diff             @@
##             conduit    #1183   +/-   ##
==========================================
  Coverage           ?   59.04%           
==========================================
  Files              ?       64           
  Lines              ?     8890           
  Branches           ?        0           
==========================================
  Hits               ?     5249           
  Misses             ?     3172           
  Partials           ?      469           

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Comment on lines 142 to 151
exporterLogger := log.New()
exporterLogger.SetFormatter(
PluginLogFormatter{
Formatter: &log.JSONFormatter{
DisableHTMLEscape: true,
},
Type: "Exporter",
Name: (*p.exporter).Metadata().Name(),
},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a helper to construct these.

)

jsonEncode := string(json.Encode(p.cfg.Exporter.Config))
err := (*p.exporter).Init(plugins.PluginConfig(jsonEncode), exporterLogger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put a comment somewhere that this is imported first so that we can fetch the round?
Would we ever want multiple exporters, which round do we choose in that scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update w/ comment.

I think it's hard to enumerate all of possible combinations of multiple exporters. I'd prefer to make it really easy to run a pipeline so that people can just spin up multiple pipelines if they need to run multiple exporters.

}

func (p *pipelineImpl) Start() error {
p.logger.Infof("Starting Pipeline Initialization")

// TODO Need to change interfaces to accept config of map[string]interface{}

// Initialize Exporter first since the pipeline derives its round from the Exporter
exporterLogger := log.New()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this further, I think we might have a concurrency issue here. Created this ticket for us to follow up later: #1187

Copy link
Contributor

@winder winder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Eric-Warehime Eric-Warehime merged commit 9cc3d36 into algorand:conduit Aug 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Not-Yet-Enabled Feature is not yet enabled at this time Skip-Release-Notes Reserved for PRs which do not need to be included in Release Notes Team Lamprey
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants